# [十二] Spring的事务管理 - 声明式事务

# Spring的事务管理

导读

在Spring中,事务也是用 AOP 切面技术来实现的,有两种实现方式:

    1. 声明式事务管理: 基于Spring AOP实现。其本质是对方法前后进行拦截,然后在目标方法开始之前创建或者加入一个事务,在执行完目标方法之后根据执行情况提交或者回滚事务。
    1. 编程式事务管理: 编程式事务管理使用TransactionTemplate可实现更细粒度的事务控制。

声明式事务管理不需要入侵代码,通过@Transactional就可以进行事务操作,更快捷而且简单且大部分业务都可以满足,推荐使用。

其实不管是编程式事务还是声明式事务,最终调用的底层核心代码是一致的。

# 声明式事务管理

1、 首先创建一个配置类,通过@EnableTransactionManagement开启事务管理功能

@Component
@EnableTransactionManagement(proxyTargetClass = false)
@MapperScan(basePackages = {"com.xiangxue.jack.dao"},annotationClass = Repository.class)
public class EnableTransactionManagementBean {

   /*
    *  以下两个@Bean的配置都可以设置数据源
    *  通过SqlSessionFactoryBean设置数据源
    */
    @Bean
    public SqlSessionFactoryBean sqlSessionFactoryBean(DataSource dataSource) {
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource);
        return sqlSessionFactoryBean;
    }

   /*
    *  通过事务管理平台的方式设置数据源
    */
    @Bean
    public PlatformTransactionManager annotationDrivenTransactionManager(DataSource dataSource) {
        DataSourceTransactionManager dtm = new DataSourceTransactionManager();
        dtm.setDataSource(dataSource);
        return dtm;
    }

}

2、定义数据源

@Configuration
@PropertySource("classpath:config/core/core.properties")
public class DataSourceConfiguration {

    @Value("${jdbc.driverClassName}")
    private String driverClass;
    @Value("${jdbc.url:jdbc}")
    private String jdbcUrl;
    @Value("${jdbc.username}")
    private String user;
    @Value("${jdbc.password}")
    private String password;

    @Bean
    public DataSource comboPooledDataSource() {
        ComboPooledDataSource comboPooledDataSource = new ComboPooledDataSource();
        try {
            comboPooledDataSource.setDriverClass(driverClass);
            comboPooledDataSource.setJdbcUrl(jdbcUrl);
            comboPooledDataSource.setUser(user);
            comboPooledDataSource.setPassword(password);
            comboPooledDataSource.setMinPoolSize(10);
            comboPooledDataSource.setMaxPoolSize(100);
            comboPooledDataSource.setMaxIdleTime(1800);
            comboPooledDataSource.setAcquireIncrement(3);
            comboPooledDataSource.setMaxStatements(1000);
            comboPooledDataSource.setInitialPoolSize(10);
            comboPooledDataSource.setIdleConnectionTestPeriod(60);
            comboPooledDataSource.setAcquireRetryAttempts(30);
            comboPooledDataSource.setBreakAfterAcquireFailure(false);
            comboPooledDataSource.setTestConnectionOnCheckout(false);
            comboPooledDataSource.setAcquireRetryDelay(100);
        } catch (PropertyVetoException e) {
            e.printStackTrace();
        }

        return comboPooledDataSource;
    }
}
jdbc.driverClassName = org.gjt.mm.mysql.Driver
jdbc.url = jdbc:mysql://127.0.0.1:3306/consult
jdbc.username = root
jdbc.password = 123456

知识点

数据源和事务管理平台的加载都是在类ProxyTransactionManagementConfiguration中进行的。Spring中的事务就是由 connection对象控制的,所以connection对象是和事务是绑定的,然后用户请求过来时又需要数据库连接来执行 sql 语句, 所以用户请求线程又是跟 connection 是绑定的。

# 开启事务注解解析流程

首先看以下@EnableTransactionManagement注解的源码

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
	// proxyTargetClass = false 表示是JDK动态代理支持接口代理。
	// true表示是Cglib代理支持子类继承代理。
	boolean proxyTargetClass() default false;
	// 事务通知模式(切面织入方式),默认代理模式(同一个类中方法互相调用拦截器不会生效)
	AdviceMode mode() default AdviceMode.PROXY;
	// 连接点上有多个通知时,排序,默认最低。值越大优先级越低。
	int order() default Ordered.LOWEST_PRECEDENCE;

}

进入TransactionManagementConfigurationSelector

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {

	@Override
	protected String[] selectImports(AdviceMode adviceMode) {
		/**
		 *  TODO : 导入需要加载的类
		 *  1、AutoProxyRegistrar: 给容器中注册一个InfrastructureAdvisorAutoProxyCreator(Spring事务入口类) 组件;
		 *  利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),
		 *  代理对象执行方法利用拦截器链进行调用;
		 *
		 *  2、ProxyTransactionManagementConfiguration: 就是一个配置类,定义了事务增强器、拦截器。
		 *
		 */
		switch (adviceMode) {
			case PROXY:
				// 重点看这2个类:
				// AutoProxyRegistrar
				// ProxyTransactionManagementConfiguration
				return new String[] {AutoProxyRegistrar.class.getName(),
						ProxyTransactionManagementConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {determineTransactionAspectClass()};
			default:
				return null;
		}
	}

	private String determineTransactionAspectClass() {
		return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
				TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
				TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
	}

}

那么这个@EnableTransactionManagement开启事务管理配置的注解是如何被调用的呢?我们回顾一下Spring的初始化核心流程

# Spring 初始化核心流程回顾

spring容器初始化的核心方法AbstractApplicationContext#refresh ,

  • ├─ refresh Spring 初始化核心流程入口
  • │ ├─ prepareRefresh ① 准备此上下文用于刷新,设置启动时间和active标志,初始化属性
  • │ ├─ obtainFreshBeanFactory ② 创建 BeanFactory
  • │ ├─ prepareBeanFactory ③ 设置 BeanFactory 的基本属性
  • │ ├─ postProcessBeanFactory ④ 子类处理自定义的BeanFactoryPostProcess
  • │ ├─ invokeBeanFactoryPostProcessors ⑤ 开启事务管理配置的调用入口
  • │ ├─ registerBeanPostProcessors ⑥ 注册拦截Bean创建的Bean处理器
  • │ ├─ initMessageSource ⑦ 初始化上下文中的资源文件,如国际化文件的处理等
  • │ ├─ initApplicationEventMulticaster ⑧ 初始化上下文的事件传播器
  • │ ├─ onRefresh ⑨ 给子类扩展初始化其他Bean,springboot 中用来做内嵌 tomcat 启动
  • │ ├─ registerListeners ⑩ 在所有bean中查找监听 bean,然后注册到广播器中
  • │ ├─ finishBeanFactoryInitialization ⑪ 初始化所有的单例Bean、ioc、BeanPostProcessor的执行、Aop入口
  • │ └─ finishRefresh ⑫ 完成刷新过程,发布相应的事件

进入invokeBeanFactoryPostProcessors() 方法

类文件: org.springframework.beans.factory.support.AbstractApplicationContext

protected void invokeBeanFactoryPostProcessors(ConfigurableListableBeanFactory beanFactory) {
// 看这个 invokeBeanFactoryPostProcessors 方法
PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(beanFactory, getBeanFactoryPostProcessors());

		if (beanFactory.getTempClassLoader() == null && beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
			beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
			beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
		}
	}

进入invokeBeanFactoryPostProcessors() 方法

类文件: org.springframework.beans.factory.support.PostProcessorRegistrationDelegate

public static void invokeBeanFactoryPostProcessors(
			ConfigurableListableBeanFactory beanFactory, List<BeanFactoryPostProcessor> beanFactoryPostProcessors) {
			
   /** ...... 省略 ......  **/
   /**
    * 调用过程
    * 在这里典型的 BeanDefinitionRegistryPostProcessor 就是 ConfigurationClassPostProcessor
    * 用于进行bean定义的加载 比如我们的包扫描,@import等
    * @EnableAspectJAutoProxy 注解中的 AspectJAutoProxyRegistrar 类就是通过此方法调用执行
    * @EnableTransactionManagement 注解中的 TransactionManagementConfigurationSelector 类就是通过此方法调用执行
    */
	invokeBeanDefinitionRegistryPostProcessors(currentRegistryProcessors, registry);
	// 调用完之后,马上clear
	currentRegistryProcessors.clear();			
}

进入invokeBeanDefinitionRegistryPostProcessors() 方法

类文件: org.springframework.beans.factory.support.PostProcessorRegistrationDelegate

private static void invokeBeanDefinitionRegistryPostProcessors(
			Collection<? extends BeanDefinitionRegistryPostProcessor> postProcessors, BeanDefinitionRegistry registry) {

		for (BeanDefinitionRegistryPostProcessor postProcessor : postProcessors) {
			// 重点看这个方法
			postProcessor.postProcessBeanDefinitionRegistry(registry);
		}
	}

进入postProcessBeanDefinitionRegistry() 方法

所在类 org.springframework.context.annotation.ConfigurationClassPostProcessor

public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
		int registryId = System.identityHashCode(registry);
		if (this.registriesPostProcessed.contains(registryId)) {
			throw new IllegalStateException(
					"postProcessBeanDefinitionRegistry already called on this post-processor against " + registry);
		}
		if (this.factoriesPostProcessed.contains(registryId)) {
			throw new IllegalStateException(
					"postProcessBeanFactory already called on this post-processor against " + registry);
		}
		this.registriesPostProcessed.add(registryId);
		// 重点看这个方法
		processConfigBeanDefinitions(registry);
	}

知识点

ConfigurationClassPostProcessor类非常重要,此类是对Spring容器中所有的BeanDefinition的新增、修改等操作,也就是说对BeanDefinition的增删查改都会在ConfigurationClassPostProcessor类中进行。同时还支持了@Import、@ImportResource、@ComponetScan、@Configruration等注解的解析工作。

进入processConfigBeanDefinitions() 方法,这个时候bean还没有实例化

类文件: org.springframework.beans.factory.support.ConfigurationClassPostProcessor

public void processConfigBeanDefinitions(BeanDefinitionRegistry registry) {
	 /** ...... 省略 ......  **/
	do {
			// 进入parse方法
			parser.parse(candidates);
			parser.validate();
	}
	while (!candidates.isEmpty());
}

接下来的调用的过程较为繁琐,此处省略无关的源码,按照如下步骤自行跟踪源码即可

进入parse方法,

进入子parse方法,

进入processConfigurationClass方法,

进入doProcessConfigurationClass方法,

最后进入processImports 方法,

类文件: org.springframework.beans.factory.support.ConfigurationClassParser 到此处,selectImports()方法最终会被ConfigurationClassParser类的processImports()方法调用执行,具体见第ConfigurationClassParser类的第569行,

private void processImports(ConfigurationClass configClass, SourceClass currentSourceClass,
			Collection<SourceClass> importCandidates, boolean checkForCircularImports) {
	 /** ...... 省略 ......  **/
	if (candidate.isAssignable(ImportSelector.class)) {
	// Candidate class is an ImportSelector -> delegate to it to determine imports
	Class<?> candidateClass = candidate.loadClass();
	// 反射拿到 ImportSelector 接口类的实例化对象 selector
	ImportSelector selector = BeanUtils.instantiateClass(candidateClass, ImportSelector.class);
	ParserStrategyUtils.invokeAwareMethods(
			selector, this.environment, this.resourceLoader, this.registry);
	if (selector instanceof DeferredImportSelector) {
		this.deferredImportSelectorHandler.handle(
				configClass, (DeferredImportSelector) selector);
	}
	else {
	/*
	 * 然后通过 selector调用其实现类 AdviceModeImportSelector
	 * 的selectImports方法,模板中的钩子方法。最后拿到返回的
	 * TransactionManagementConfigurationSelector的selectImports
	 * 方法返回的两个类:
	 *  1、AutoProxyRegistrar.class
	 *  2、ProxyTransactionManagementConfiguration.class
     *  继续递归调用当前的 processImports 方法
     */
		String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());
		Collection<SourceClass> importSourceClasses = asSourceClasses(importClassNames);
		processImports(configClass, currentSourceClass, importSourceClasses, false);
	}
}
   
    }

# 开启事务的入口类

进入selectImports()方法,找到其实现类AdviceModeImportSelectorselectImports方法

类文件: org.springframework.context.annotation.AdviceModeImportSelector

public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
		Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
		Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");

		AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
		if (attributes == null) {
			throw new IllegalArgumentException(String.format(
					"@%s is not present on importing class '%s' as expected",
					annType.getSimpleName(), importingClassMetadata.getClassName()));
		}

		AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
		// 此方法最终会调用TransactionManagementConfigurationSelector
		// 的 selectImports方法
		String[] imports = selectImports(adviceMode);
		if (imports == null) {
			throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
		}
		return imports;
	}

调回到selectImports方法后,会把AutoProxyRegistrarProxyTransactionManagementConfiguration两个类注册到Beandefinition

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
	@Override
	protected String[] selectImports(AdviceMode adviceMode) {
		/**
		 *  TODO : 导入需要加载的类
		 *  1、AutoProxyRegistrar: 给容器中注册一个InfrastructureAdvisorAutoProxyCreator 组件;
		 *  利用后置处理器机制在对象创建以后,包装对象,返回一个代理对象(增强器),
		 *  代理对象执行方法利用拦截器链进行调用;
		 *
		 *  2、ProxyTransactionManagementConfiguration: 就是一个配置类,定义了事务增强器、拦截器。
		 *
		 */
		switch (adviceMode) {
			case PROXY:
				// 重点看这2个类:
				// AutoProxyRegistrar
				// ProxyTransactionManagementConfiguration
				return new String[] {AutoProxyRegistrar.class.getName(),
						ProxyTransactionManagementConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {determineTransactionAspectClass()};
			default:
				return null;
		}
	}
}

# 事务核心类 - AutoProxyRegistrar 源码

public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
		boolean candidateFound = false;
		// 循环遍历导入类上使用的所有注解(主要处理注解中有 mode和 proxyTargetClass属性的)
		Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
		for (String annoType : annoTypes) {
			AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
			if (candidate == null) {
				continue;
			}
			// 获取注解中的 mode 属性
			Object mode = candidate.get("mode");
			// 获取注解中的 proxyTargetClass 属性
			Object proxyTargetClass = candidate.get("proxyTargetClass");
			if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
					Boolean.class == proxyTargetClass.getClass()) {
				candidateFound = true;
				// 如果 mode 是代理模式
				if (mode == AdviceMode.PROXY) {
					// 注册事务AOP的入口类 InfrastructureAdvisorAutoProxyCreator,实际上这个AOP入口类起不了作用
					AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
					if ((Boolean) proxyTargetClass) {
						AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
						return;
					}
				}
			}
		}
	
	}

# 事务核心类 - ProxyTransactionManagementConfiguration 源码

@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

	@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	//
	/**
	 * TODO : 定义事务增强器(事务织入)
	 *
	 * 定义了一个 advisor,设置事务属性、设置事务拦截器 TransactionInterceptor、设置顺序。
	 * 核心就是事务拦截器 TransactionInterceptor。
	 */
	public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
		BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
		advisor.setTransactionAttributeSource(transactionAttributeSource());
		advisor.setAdvice(transactionInterceptor());
		if (this.enableTx != null) {
			advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
		}
		return advisor;
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	// 主要负责解析 @Transactional注解里的属性资源
	// 并包装为 TransactionAttribute 类型
	public TransactionAttributeSource transactionAttributeSource() {
		return new AnnotationTransactionAttributeSource();
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	// 定义事务拦截器
	public TransactionInterceptor transactionInterceptor() {
		// TransactionInterceptor 实现了 MethodInterceptor 接口,
		// 在链式调用中会自动调用它的 invoke 方法
		TransactionInterceptor interceptor = new TransactionInterceptor();
		// 设置 TransactionAttribute 属性
		interceptor.setTransactionAttributeSource(transactionAttributeSource());
		// 如果当前事务管理器不为空,设置当前事务管理器到拦截器中
		// 事务管理器和数据源绑定,此处可以自定义
		if (this.txManager != null) {
			interceptor.setTransactionManager(this.txManager);
		}
		return interceptor;
	}

}

知识点

此处可以参考父类AbstractTransactionManagementConfiguration中的setConfigurers方法,通过重写TransactionManagementConfigurerannotationDrivenTransactionManager方法来自定义一个事务管理器。

@Component
public class TransactionManagementConfigurerBean implements TransactionManagementConfigurer {
    @Autowired
    private DataSource dataSource;
    @Override
    public PlatformTransactionManager annotationDrivenTransactionManager() {
        DataSourceTransactionManager dtm = new DataSourceTransactionManager();
        dtm.setDataSource(dataSource);
        return dtm;
    }
}

到此为止,Spring中@EnableTransactionManagement注解的内部注册解析的准备工作完成。

# 事务拦截器

TransactionInterceptor

Spring声明式事物是基于AOP实现的,如果目标方法存在事物则会对目标对象进行增强代理(JDK/Cglib)。而TransactionInterceptor则是事物体系中的增强器(Advise),它实现了MethodInterceptor接口,TransactionInterceptor会被封装在使用了事务注解@Transactional的bean组件外面形成该组件的代理对象,当调用相应使用事务注解的方法时,TransactionInterceptorinvoke方法拦截器逻辑会被调用执行,从而完成相应的事务管理,包括:创建、提交和回滚等底层操作。

# Spring 事务拦截处理流程

spring事务拦截器解析流程TransactionInterceptor#invoke

  • ├─ invoke Spring 事务拦截器入口
  • ├─ invokeWithinTransaction 调用Spring内部事务
  • │ ├─ getTransactionAttributeSource ① 通过事务属性源读取事务的属性配置
  • │ ├─ getTransactionAttribute ② 获取该方法的事务属性(传播机制、隔离级别)
  • │ ├─ determineTransactionManager ③ 查找容器中的PlatformTransactionManager,用于管理事务
  • │ ├─ methodIdentification ④ 获取被代理的类方法名称
  • │ ├─ createTransactionIfNecessary ⑤ 创建事务并保存到TransactionInfo 声明式事务
  • │ ├─ proceedWithInvocation ⑥ 火炬传递方法,拦截器链调用处理
  • │ ├─ completeTransactionAfterThrowing ⑦ 执行业务报错,回滚事务
  • │ ├─ cleanupTransactionInfo ⑧ 清空当前事务信息,重置为老的
  • │ ├─ commitTransactionAfterReturning ⑨ 返回结果之前提交事务
  • │ ├─ execute ① 执行实现TransactionCallback接口的doInTransaction回调方法 编程式事务
  • │ │ ├─ prepareTransactionInfo ② 准备事务信息
  • │ │ │ ├─ new TransactionInfo ③ 创建事务信息对象
  • │ │ │ ├─ newTransactionStatus ④ 为事务信息对象设置事务状态
  • │ │ │ └─ bindToThread ⑤ 把创建的事务信息对象和线程绑定到TreadLocal
  • │ │ ├─ proceedWithInvocation ⑥ 火炬传递方法, 拦截器链调用处理
  • │ │ ├─ rollbackOn ⑦ 事务是否满足对异常进行回滚处理条件
  • │ │ └─ cleanupTransactionInfo ⑧ 初始化所有的单例Bean、ioc、BeanPostProcessor的执行、Aop入口

进入invoke 方法,

类文件: org.springframework.transaction.interceptor.TransactionInterceptor

	/**
	 *  TODO : 事务拦截器的拦截方法
	 * 事务拦截器TransactionInterceptor回调方法invoke通过调用TransactionAspectSupport事务切面支持类中的
	 * createTransactionIfNecessary和prepareTransactionInfo方法创建事务对象
	 */
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// Work out the target class: may be {@code null}.
		// The TransactionAttributeSource should be passed the target class
		// as well as the method, which may be from an interface.
		// 通过 AOP获取事务的目标类
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
		// 核心方法,调用内部事务处理,实际调用父类 TransactionAspectSupport的 invokeWithinTransaction方法
		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
	}

进入invokeWithinTransaction 方法,父类中实现

类文件: org.springframework.transaction.interceptor.TransactionAspectSupport

/**
 * TODO : 调用内部事务
 */
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
			final InvocationCallback invocation) throws Throwable {

		// If the transaction attribute is null, the method is non-transactional.
		// 通过事务属性源 TransactionAttributeSource读取事务的属性配置(名称匹配)
		TransactionAttributeSource tas = getTransactionAttributeSource();
		// 获取对应事务属性.如果事务属性为空(则目标方法不存在事务)
		// 事务属性源 NameMatchTransactionAttributeSource 的方法
		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
		// 根据事务的属性获取 beanFactory 中的 PlatformTransactionManager (spring事务管理器的顶级接口),
		// 获取 Spring事务管理IoC容器配置的事务处理器
		// 一般常用实现是 DataSourceTransactiuonManager
		final PlatformTransactionManager tm = determineTransactionManager(txAttr);
		// 目标方法唯一标识(类.方法,如service.ServiceImpl.add)
		// 获取目标类指定方法的事务连接点
		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
		/*
		 * 区分不同类型的 PlatformTransactionManager事务处理器,不同类型的事务处理器调用方式不同。对
		 * CallbackPreferringPlatformTransactionManager,需要回调函数来实现事务的创建和提交,对非
		 * CallbackPreferringPlatformTransactionManager来说,则不需要使用回调函数来实现事务处理。
		 */
		// 如果txAttr为空或者tm 属于非CallbackPreferringPlatformTransactionManager类型的事务处理器
		// 声明式事务的操作
		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
			// Standard transaction demarcation with getTransaction and commit/rollback calls.
			// 使用 getTransaction 和 提交/回滚 调用的标准事务划分。
			// 创建事务,将当前事务状态和信息保存到TransactionInfo对象中,重要
			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
			Object retVal = null;
			try {
				// This is an around advice: Invoke the next interceptor in the chain.
				// This will normally result in a target object being invoked.
				// 这里就是一个环绕增强,在这个proceed前后可以自己定义增强实现。
				// 回调方法执行,执行目标方法(原有的业务逻辑)
				retVal = invocation.proceedWithInvocation();
			}
			catch (Throwable ex) {
				// target invocation exception
				// 根据事务定义的,该异常需要回滚就回滚,否则提交事务
				completeTransactionAfterThrowing(txInfo, ex);
				throw ex;
			}
			finally {
				// 清空当前事务信息,重置为老的
				cleanupTransactionInfo(txInfo);
			}
			// 返回结果之前提交事务
			commitTransactionAfterReturning(txInfo);
			return retVal;
		}
		// 编程式事务
		// CallbackPreferringPlatformTransactionManager类型的事务处理器
		else {
			final ThrowableHolder throwableHolder = new ThrowableHolder();

			// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
			try {
				// 执行实现T ransactionCallback 接口的 doInTransaction 回调方法
				Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
					TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
					try {
						// 拦截器链调用处理,使得最后目标对象的方法得到调用
						return invocation.proceedWithInvocation();
					}
					catch (Throwable ex) {
						// 如果事务满足对异常进行回滚处理条件
						if (txAttr.rollbackOn(ex)) {
							// A RuntimeException: will lead to a rollback.
							// 如果异常是运行时异常,则事务回滚处理
							if (ex instanceof RuntimeException) {
								throw (RuntimeException) ex;
							}
							//如果不是运行时异常,则提交处理
							else {
								throw new ThrowableHolderException(ex);
							}
						}
						else {
							// A normal return value: will lead to a commit.
							// 提交处理
							throwableHolder.throwable = ex;
							return null;
						}
					}
					//清除当前线程绑定的事务信息
					finally {
						cleanupTransactionInfo(txInfo);
					}
				}); // 到这里都是execute方法

				// Check result state: It might indicate a Throwable to rethrow.
				// 如果是ThrowableHolder类型的异常,则转换为Throwable抛出,上抛异常
				if (throwableHolder.throwable != null) {
					throw throwableHolder.throwable;
				}
				// 否则异常不做处理直接抛出
				return result;
			}
			catch (ThrowableHolderException ex) {
				throw ex.getCause();
			}
			catch (TransactionSystemException ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
					ex2.initApplicationException(throwableHolder.throwable);
				}
				throw ex2;
			}
			catch (Throwable ex2) {
				if (throwableHolder.throwable != null) {
					logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
				}
				throw ex2;
			}
		}
	}

进入getTransactionAttribute 方法,父类中实现

类文件: org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource

	public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
		if (method.getDeclaringClass() == Object.class) {
			return null;
		}

		// 首先看是否存在缓存值
		Object cacheKey = getCacheKey(method, targetClass);
		TransactionAttribute cached = this.attributeCache.get(cacheKey);
		if (cached != null) {
			// 如果缓存的是一个无事务对象,直接返回 null
			if (cached == NULL_TRANSACTION_ATTRIBUTE) {
				return null;
			}
			else {
				return cached;
			}
		}
		// 缓存中不存在
		else {
			// 重新计算 TransactionAttribute 属性,根据方法和类的类型获取事务信息
			TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
			// 存到缓存中
			if (txAttr == null) {
				this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
			}
			else {
				String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
				if (txAttr instanceof DefaultTransactionAttribute) {
					((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
				}
				if (logger.isTraceEnabled()) {
					logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
				}
				// 将事务对象属性放到缓存中,缓存的key与类的类型和方法相关
				this.attributeCache.put(cacheKey, txAttr);
			}
			return txAttr;
		}
	}

进入computeTransactionAttribute 方法,父类中实现

类文件: org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource

	protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
		// 如果被代理方法非public类型,则直接返回空,Spring事务不支持非public类型方法
		if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
			return null;
		}

		// 方法可能在接口上,但是我们需要目标类的属性。
		// 如果目标类为空,则方法将保持不变。
		Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);

		// First try is the method in the target class.
		// 解析方法上的 @Transactional 属性
		TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
		if (txAttr != null) {
			return txAttr;
		}

		// Second try is the transaction attribute on the target class.
		// 如果方法上没有事务属性,就从类上来获取
		txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
		if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
			return txAttr;
		}

		if (specificMethod != method) {
			// Fallback is to look at the original method.
			txAttr = findTransactionAttribute(method);
			if (txAttr != null) {
				return txAttr;
			}
			// Last fallback is the class of the original method.
			txAttr = findTransactionAttribute(method.getDeclaringClass());
			if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
				return txAttr;
			}
		}

		return null;
	}

知识点

被代理方法是否是public类型,就是在computeTransactionAttribute这个方法中进行校验的,Spring中的事务只对public类型的方法起作用,非public类型事务失效。

# @Transactional注解的解析

进入findTransactionAttribute 方法,

进入determineTransactionAttribute 方法,

进入parseTransactionAnnotation 方法,

类文件: org.springframework.transaction.annotation.SpringTransactionAnnotationParser

/**
 * TODO : 解析事务注解中的属性
 */
 protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
		RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
		// 获取事务传播属性
		Propagation propagation = attributes.getEnum("propagation");
		rbta.setPropagationBehavior(propagation.value());
		// 获取事务隔离等级
		Isolation isolation = attributes.getEnum("isolation");
		rbta.setIsolationLevel(isolation.value());
		// 获取事务获取事务超时时间
		rbta.setTimeout(attributes.getNumber("timeout").intValue());
		// 获取事务是否只读
		rbta.setReadOnly(attributes.getBoolean("readOnly"));
		// 获取事务事务管理器 bean的名称
		rbta.setQualifier(attributes.getString("value"));

		List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
		// 获取事务回滚相关配置
		for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
			rollbackRules.add(new RollbackRuleAttribute(rbRule));
		}
		for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
			rollbackRules.add(new RollbackRuleAttribute(rbRule));
		}
		for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
		}
		for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
		}
		rbta.setRollbackRules(rollbackRules);

		return rbta;
	}

到此为止,@Transactional注解的加载工作也完成,注解的属性值已近被加载到spring容器中了,接下来开始具体的事务操作过程。

# 事务的实现过程

创建事务之前的准备工作已经完成了,那么具体事务是如何创建实现的呢?

回到invokeWithinTransaction方法,

进入createTransactionIfNecessary 方法,

类文件: org.springframework.transaction.interceptor.TransactionAspectSupport

/**
 * TODO : 根据给定的事务属性创建事务对象
 */
 protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
			@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
		// 如果没有指定名称,则应用方法标识作为事务名称。
		// 读取事务方法调用的事务配置属性
		if (txAttr != null && txAttr.getName() == null) {
			// 如果事务名称为 null,则使用方法的名称(事务连接点标识)作为事务名称,
			// 调用一个实现 DelegatingTransactionAttribute 接口的匿名内部类
			txAttr = new DelegatingTransactionAttribute(txAttr) {
				@Override
				// 使用方法名称作为事务名称
				public String getName() {
					return joinpointIdentification;
				}
			};
		}
		// 事务状态封装了事务执行的状态信息
		TransactionStatus status = null;
		if (txAttr != null) {
			if (tm != null) {
				// 事务处理器创建事务,并且返回当前事务的状态信息
				// 核心方法,真正底层创建事务对象的方法
				status = tm.getTransaction(txAttr);
			}
			else {
				if (logger.isDebugEnabled()) {
					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
							"] because no transaction manager has been configured");
				}
			}
		}
		// 准备事务信息,事务信息 TransactionInfo 封装了事务配置和状态信息
		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
	}

进入getTransaction 方法,

类文件: org.springframework.transaction.support.AbstractPlatformTransactionManager

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
		// doGetTransaction()方法是抽象方法,具体的实现由具体的事务处理器提供
		Object transaction = doGetTransaction();

		// Cache debug flag to avoid repeated checks.
		boolean debugEnabled = logger.isDebugEnabled();
		// 如果没有配置事务属性,则使用默认的事务属性
		if (definition == null) {
			// Use defaults if no transaction definition given.
			definition = new DefaultTransactionDefinition();
		}
		// 检查当前线程是否存在事务,如果已存在事务,那么需要根据在事务属性中定义的事务传播属性来处理事务的产生(检查传播行为)
		if (isExistingTransaction(transaction)) {
			// Existing transaction found -> check propagation behavior to find out how to behave.
			// 处理已存在的事务的情况
			return handleExistingTransaction(definition, transaction, debugEnabled);
		}

		// Check definition settings for new transaction.
		// 检查事务属性中timeout超时属性设置是否合理
		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
		}

		// No existing transaction found -> check propagation behavior to find out how to proceed.
		// 对事务属性中配置的事务传播特性处理
		// 如果当前获取不到存在的事务,且事务传播特性配置的是 mandatory,当前没有事务存在,抛出异常
		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
			throw new IllegalTransactionStateException(
					"No existing transaction found for transaction marked with propagation 'mandatory'");
		}
		// 获取不到存在的事务,并且事务传播属性是 REQUIRED、REQUIRES_NEW、NESTED
		// 则执行事务挂起,新建事务。
		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
			// 挂起当前事务
			SuspendedResourcesHolder suspendedResources = suspend(null);
			if (debugEnabled) {
				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
			}
			try {
				// 不激活和当前线程绑定的事务,因为事务传播特性配置要求创建新的事务
				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
				// 创建一个新的事务状态
				DefaultTransactionStatus status = newTransactionStatus(
						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
				// 核心方法:创建事务的调用,具体实现由具体的事务处理器提供。
				// 设置事务隔离级别(spring默认的事务隔离级别是跟JDBC相同的,即默认情况Spring不设置事务隔离级别);
				doBegin(transaction, definition);
				// 初始化和同步事务状态
				prepareSynchronization(status, definition);
				return status;
			}
			catch (RuntimeException | Error ex) {
				resume(null, suspendedResources);
				throw ex;
			}
		}
		else {
			// Create "empty" transaction: no actual transaction, but potentially synchronization.
			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
						"isolation level will effectively be ignored: " + definition);
			}
			// 创建空事务,针对 supported 类型的事务传播特性,激活和当前线程绑定的事务
			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
			// 准备事务状态
			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
		}
	}

知识点

抽象事务管理器AbstractPlatformTransactionManager提供了创建事务的模板,这个模板会被具体的事 务处理器所使用,抽象事务管理器根据事务属性配置和当前线程绑定信息对事务是否需要创建以及如何创建进行一 些通用的处理,然后把事务创建的底层细节交给具体的事务处理器实现。

进入doGetTransaction 方法,由jdbc包下的数据源类实现

类文件: org.springframework.jdbc.datasource.DataSourceTransactionManager

protected Object doGetTransaction() {
		// 管理 connection对象,创建回滚点,按照回滚点回滚,释放回滚点
		// DataSourceTransactionObject 就是真正的事务对象
		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
		// DataSourceTransactionManager 默认是允许嵌套事务的
		txObject.setSavepointAllowed(isNestedTransactionAllowed());
		// obtainDataSource() 获取数据源对象,其实就是数据库连接对象
		// ConnectionHolder 持有了 Connection,并对 Connection进行了包装
		ConnectionHolder conHolder =
				(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
		txObject.setConnectionHolder(conHolder, false);
		return txObject;
	}

进入getResource方法,

类文件: org.springframework.transaction.support.TransactionSynchronizationManager

public static Object getResource(Object key) {
		// 查看数据源连接池有没有扩展,一般没有
		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
		Object value = doGetResource(actualKey);
		if (value != null && logger.isTraceEnabled()) {
			logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
					Thread.currentThread().getName() + "]");
		}
		return value;
	}

进入doGetResource方法,

类文件: org.springframework.transaction.support.TransactionSynchronizationManager

private static Object doGetResource(Object actualKey) {
		// 事务默认传播属性会共用同一个连接,因此会去 ThreadLocal 中获取连接,如果存在就直接返回
		// map对应了当前数据源对象和 当前连接的绑定关系
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}

执行到doGetResource,也就说明了,事务的执行默认使用了同一个数据库连接。

doGetTransaction 方法执行完返回了事务对象,返回到getTransaction方法中

进入doBegin方法

类文件: org.springframework.jdbc.datasource.DataSourceTransactionManager

protected void doBegin(Object transaction, TransactionDefinition definition) {
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			// 如果没有数据库连接
			if (!txObject.hasConnectionHolder() ||
					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				// 从连接池里面获取连接
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
				// 把连接包装成 ConnectionHolder,然后设置到事务对象中
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			con = txObject.getConnectionHolder().getConnection();
			// 从数据库连接中获取隔离级别,Mysql默认隔离级别是 可重复读
			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			txObject.setPreviousIsolationLevel(previousIsolationLevel);

			if (con.getAutoCommit()) {
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				// 关闭连接的自动提交,其实这步就是开启了事务
				con.setAutoCommit(false);
			}
			// 设置只读事务 从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,
			// 其他事务所提交的数据,该事务将看不见!
			// 设置只读事务就是告诉数据库,我这个事务内没有新增,修改,删除操作
			// 只有查询操作,不需要数据库锁等操作,减少数据库压力
			prepareTransactionalConnection(con, definition);
			// 自己提交关闭了,就说明已经开启事务了,事务是活动的
			txObject.getConnectionHolder().setTransactionActive(true);

			int timeout = determineTimeout(definition);
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the connection holder to the thread.
			if (txObject.isNewConnectionHolder()) {
				// 如果是新创建的事务,则建立当前线程和数据库连接的关系
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}

doBegin方法中的 con.setAutoCommit(false)执行完,正式开启了Spring 事务管理功能。

# 事务问题汇总

问题一

AutoProxyRegistrar是AOP的入口类,之前@EnableAspectJAutoProxy也会通过AutoProxyRegistrar生成一个AOP的入口类,那么AOP的入口类会有两个吗?

参考答案

不会,Spring AOP创建的入口类名字默认都叫org.springframework.aop.config.internalAutoProxyCreator,并且优先级设置在AopConfigUtils类中:


 static {
       // 事务代理 入口类优 先级最低
		APC_PRIORITY_LIST.add(InfrastructureAdvisorAutoProxyCreator.class);
		// XML代理 入口类 优先级其次
		APC_PRIORITY_LIST.add(AspectJAwareAdvisorAutoProxyCreator.class);
		// AspectJ代理 入口类 优先级最高
		APC_PRIORITY_LIST.add(AnnotationAwareAspectJAutoProxyCreator.class);
    }

由此可见,AspectJ的入口类AnnotationAwareAspectJAutoProxyCreator优先级最高,也就是说如果设置了切面,那么入口类只会有AnnotationAwareAspectJAutoProxyCreator会起作用。

问题二

两个方法上都使用了@Transactional默认传播属性,内部是如何保证使用了同一个连接的?

参考答案

事务默认传播属性会共用同一个连接,内部是通过ThreadLocal来保证连接的重复可用。默认第一次肯定不会从Map中拿到连接,直接返回null,只有第二次进来get()的值才不会为空。

private static Object doGetResource(Object actualKey) {
		// 事务默认传播属性会共用同一个连接,因此会去 ThreadLocal 中获取连接,
		// 如果存在就直接返回,否则创建新的连接
		// Map保存了当前数据源对象 和 当前连接的绑定关系
		Map<Object, Object> map = resources.get();
		if (map == null) {
			return null;
		}
		Object value = map.get(actualKey);
		// Transparently remove ResourceHolder that was marked as void...
		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
			map.remove(actualKey);
			// Remove entire ThreadLocal if empty...
			if (map.isEmpty()) {
				resources.remove();
			}
			value = null;
		}
		return value;
	}